Skip to content

feat(kinesis): Kinesis Data Streams Library#6516

Merged
jvh-aws merged 150 commits intomainfrom
feat/kinesis-data-streams-library
Mar 17, 2026
Merged

feat(kinesis): Kinesis Data Streams Library#6516
jvh-aws merged 150 commits intomainfrom
feat/kinesis-data-streams-library

Conversation

@ekjotmultani
Copy link
Copy Markdown
Member

Issue #, if available:

Description of changes:

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@ekjotmultani ekjotmultani requested a review from a team as a code owner January 22, 2026 17:21
@ekjotmultani ekjotmultani marked this pull request as draft January 26, 2026 19:13
@ekjotmultani ekjotmultani changed the title Feat/kinesis data streams library feat(kinesis): Kinesis Data Streams Library Feb 3, 2026
final Future<void> Function() _onFlush;
Timer? _timer;
bool _enabled = true;
bool _closed = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of having both _enabled and _closed?

Copy link
Copy Markdown
Member Author

@ekjotmultani ekjotmultani Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_closed can serve as a flag for other code to know whether or not the client itself is shut down or if just flushing is enabled or not. In our implementation they do similar things but it is useful for getters to know the internal state of the client I feel. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both approaches are fine, but personally, I recommend merging them for the auto flush scheduler, since _closed is not really used.

});
});
});
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ensure concurrent access to the record storage works via a test?

@ekjotmultani ekjotmultani force-pushed the feat/kinesis-data-streams-library branch 3 times, most recently from a21d70d to ff90989 Compare February 27, 2026 08:29
@ekjotmultani ekjotmultani marked this pull request as ready for review February 27, 2026 08:29
@ekjotmultani ekjotmultani force-pushed the feat/kinesis-data-streams-library branch 4 times, most recently from d9ec047 to 8ec0ec5 Compare March 4, 2026 06:14
/// {@template aws_kinesis_datastreams.kinesis_service_exception}
/// Thrown when a Kinesis SDK/API error occurs. Inspect [sdkException] for details.
/// {@endtemplate}
final class KinesisServiceException extends AmplifyKinesisException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we discussed that those errors are silently swallowed and not exposed.

/// Thrown when a single record exceeds the Kinesis per-record size limit
/// (10 MiB, partition key + data blob combined).
/// {@endtemplate}
final class KinesisRecordTooLargeException extends AmplifyKinesisException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a specific type for records being too large, we should also have one for the partition key being too long.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, added a corresponding type

if (_closed) throw ClientClosedException();
if (!_enabled) return;

if (record.dataSize > kKinesisMaxRecordBytes) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also check the partition key length here. It should be in (0, 256]

''',
variables: [Variable.withInt(maxCount), Variable.withInt(maxBytes)],
readsFrom: {_db.kinesisRecords},
).get();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this query fails? In my understanding, currently, we'd swallow the error in flush's try-catch block. However, I think we would like to expose such errors as storage errors. Can we add logic to expose such errors as Kinesis* errors? Or am I missing something?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, good call, wrapped the failures in error object


/// Returns the current total size of cached data in bytes.
Future<int> getCurrentCacheSize() async {
final query = _db.selectOnly(_db.kinesisRecords)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, I commented this before: We can keep an integer for the cache size and update / check against that, when we add new records. We will only need to run a DB query after removing records. This allows us to avoid the extra query when recording, which is most likely the most frequent operation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, done

@ekjotmultani ekjotmultani changed the base branch from main to feat/amplify-foundation-dart-minimal March 4, 2026 17:08
@ekjotmultani ekjotmultani force-pushed the feat/kinesis-data-streams-library branch from 5ffac53 to 11bb6e0 Compare March 5, 2026 15:04
@ekjotmultani ekjotmultani changed the base branch from feat/amplify-foundation-dart-minimal to main March 5, 2026 16:47
ekjotmultani and others added 8 commits March 5, 2026 08:49
Adds the foundational package with minimal constructs needed for
the Kinesis client libraries:
- AmplifyException base class
- AWSCredentialsProvider<T> and AWSCredentials sealed hierarchy
- Logger interface with AmplifyLogger implementation
- LogLevel enum
- Result<T, E> sealed type
Provides V2CredentialsProviderBridge to bridge aws_common (V2)
credentials to amplify_foundation_dart (V3) credentials.
Shared by kinesis client packages to avoid duplicating bridge logic.
jvh-aws and others added 19 commits March 13, 2026 14:50
* feat(kinesis): return RecordData from record() for cross-platform alignment\n\nChange record() return type from Result<void> to Result<RecordData>\nacross both the Dart client and Flutter wrapper. RecordData contains\nthe recorded entry size (data + partition key bytes), aligning with\nthe Android and Swift Kinesis client APIs.\n\nWhen the client is disabled, returns RecordData(recordSize: 0)."

* feat(kinesis): return RecordData from record() for cross-platform alignment\n\nChange record() return type from Result<void> to Result<RecordData>\nacross both the Dart client and Flutter wrapper. RecordData contains\nthe recorded entry size (data + partition key bytes), aligning with\nthe Android and Swift Kinesis client APIs.\n\nWhen the client is disabled, returns RecordData(recordSize: 0)."
refactor(kinesis): rename kinesis_data_streams_options.dart to amplify_kinesis_client_options.dart

Align the options file name with the client class name for consistency.

Co-authored-by: jvh-aws <jvhoff@amazon.de>
* Reapply "Align exceptions with Amplify v2 AmplifyException pattern\n\nSwitch AmplifyKinesisException to extend amplify_core's\nAmplifyException instead of amplify_foundation_dart's.\nAdopt positional message, optional recoverySuggestion,\nunderlyingException (was cause), const constructors,\nand runtimeTypeName overrides.""

This reverts commit f660c5f.

* Add const to ClientClosedException() call sites

---------

Co-authored-by: jvh-aws <jvhoff@amazon.de>
Expose library version in foundation, use it in kinesis user agent
* Expose library version in foundation, use it in kinesis user agent

* Fix flush strategy naming

* Apply autoformat

* Apply autoformat
* Expose library version in foundation, use it in kinesis user agent

* Fix flush strategy naming

* Apply autoformat

* Apply autoformat

* Rename package from aws_kinesis_datastreams to amplify_kinesis

* Fix analyzer
harsh62
harsh62 previously approved these changes Mar 16, 2026
@harsh62 harsh62 requested a review from cadivus March 16, 2026 17:34
Jonas Greifenhain and others added 2 commits March 16, 2026 18:53
Strip RecordData to an empty wrapper for now. Fields can be added
later once cross-platform alignment on the return shape is finalized.
@ekjotmultani ekjotmultani dismissed a stale review via b50ea0c March 16, 2026 18:26
jvh-aws and others added 2 commits March 16, 2026 23:07
* Minor doc fixes and remove reexport

* Adapt public facing docstrings
…6795)

* refactor: remove Category.kinesis enum, use string categories in deploy script

Decouple deploy_gen2.dart from the amplify_core Category enum by
changing AmplifyBackendGroup.category to a plain String. This allows
removing Category.kinesis from the enum and its dead switch case in
AmplifyClassImpl, since the kinesis client is standalone and does not
participate in the Amplify plugin registration system.

The --category CLI flag now derives allowed values from infraConfig
instead of Category.values.

* fix: match original Category.api.name casing ('API' not 'Api')
@jvh-aws jvh-aws requested review from cadivus, harsh62 and jvh-aws March 17, 2026 09:27
@jvh-aws jvh-aws merged commit 7f52a04 into main Mar 17, 2026
152 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants